-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Cap op concurrency with exponential ramp-up #40275
[Data] Cap op concurrency with exponential ramp-up #40275
Conversation
514a0af
to
4ff0a9c
Compare
Signed-off-by: Hao Chen <chenh1024@gmail.com>
4ff0a9c
to
fc14b94
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG overall.
|
||
# Environment variable to configure this policy. | ||
# The format is: "<init_cap>,<cap_multiply_threshold>,<cap_multiplier>" | ||
CONFIG_ENV_VAR = "RAY_DATA_CONCURRENCY_CAP_CONFIG" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not blocking comment, but better to have separate environment variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to facilitate internal tests in 2.9. When we officially release this feature, we will probably simplify the configs. So I'll keep it for now.
# The intial concurrency cap for each operator. | ||
INIT_CAP = 4 | ||
# When the number of finished tasks reaches this threshold, the concurrency cap | ||
# will be multiplied by the multiplier. | ||
CAP_MULTIPLY_THRESHOLD = 0.5 | ||
# The multiplier to multiply the concurrency cap by. | ||
CAP_MULTIPLIER = 2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another way is to define these constant on the file level, so we don't need to parse environment variables. It's also maybe easier for advanced users to test out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue with constants is that if the executor doesn't run on the driver (e.g., on the SplitCoordinator actor), it's hard to change the configs. I've seen this issue for other configs that depend on constants. Do you know a good solution to bypass this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll need to put them into the DataContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang @c21 If we put them into DataContext, I'd like to to save them in a dict and add a key-value interface in DataContext.
The reason is because this is a plugin and DataContext shouldn't need to know about the plugin configs.
What do you think?
The API will be something like:
data_context.set("concucrrency_cap_backpressure_policy.init_cap", 4)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me for now while it's still experimental. Actually, could you prepend the name with "experimental" or something like that? Makes deprecation a bit smoother.
|
||
|
||
# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default. | ||
DEFAULT_BACKPRESSURE_PLOCIES = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So do we foresee in the near future, we will have multiple policies enabled at same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is too early to abstract backpressure policies, considering we only have one right now. Can we just put the new concurrency caps inside the current backpressure code under a feature flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do plan to migrate existing backpressure code to this interface.
Another reason why I wanted to introduce this interface is that it will allow us experimenting new back-pressure policies without touching the code base. E.g., one idea is to take the real runtime metrics into consideration for backpressure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I don't feel strongly about it, just think that it may be too early to abstract (we don't know yet if this is the right interface).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the interface is likely to change as we implement other policies. it's an internal interface, so that should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine merging if we turn it off by default, but I don't think this policy is going to be usable. The performance regression is scary, and the configuration parameters here are going to be pretty confusing for the average user, I think. Also, off the top of my head, there are likely many cases that won't work or will regress (e.g., concurrency=4 cap may or may not work depending on num_cpus/task and num_cpus total).
Offline, let's try to come up with a policy that minimizes regressions and has as few configuration parameters as possible, ideally none. I believe we can come up with something reasonable by considering the total number of operators when assigning the initial caps. For example, if we have N operators to run, we give each 1/N cores initially then adjust based on which operators are ready.
Also, I think this PR needs some doc changes to instruct on how to turn on the feature and configure it.
|
||
|
||
# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default. | ||
DEFAULT_BACKPRESSURE_PLOCIES = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is too early to abstract backpressure policies, considering we only have one right now. Can we just put the new concurrency caps inside the current backpressure code under a feature flag?
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
@stephanie-wang You are right. The current default config isn't optimal. I also thought of making the default config smarter by taking into consideration the number of ops and their resource requirements. The plan is to implement the basic framework and turn this off in 2.8. In 2.9, we'll need to do more experiments to figure out the best configuration and officially release this feature. |
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@c21 @stephanie-wang thanks for your comments. They are either addressed (resolved threads) or replied (non-resolved threads). Please take a look again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merge after making it configurable through DataContext instead of the env variables.
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Moved the config to DataContext and added the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG w/ two minor comments.
python/ray/data/context.py
Outdated
@@ -219,6 +219,7 @@ def __init__( | |||
self.enable_get_object_locations_for_metrics = ( | |||
enable_get_object_locations_for_metrics | |||
) | |||
self._plugin_configs: Dict[str, Any] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rename it as _backpressure_plugin_configs
? In the future, we may introduce other plugin components.
python/ray/data/context.py
Outdated
def get_plugin_config(self, key: str, default: Any = None) -> Any: | ||
return self._plugin_configs.get(key, default) | ||
|
||
def set_plugin_config(self, key: str, value: Any) -> None: | ||
self._plugin_configs[key] = value | ||
|
||
def remove_plugin_config(self, key: str) -> None: | ||
self._plugin_configs.pop(key, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's declare these methods with _
prefix, so they remain private and easy to change later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, we'll make this API reusable for other components.
Today when a new dataset is launched, the StreamingExecutor will allocate all resources to the first operator. Consider a simple case `read -> map` where the read and map are not fused, the scheduler will submit as many read tasks as possible up front. Assuming the read tasks will output many blocks, the blocks will pile up because we don’t have resources to submit map tasks to consume the data. This PR tries to mitigate this issue by cap the concurrency of each op with an initial value and ramp up the cap exponentially as the execution goes on (see `ConcurrencyCapBackpressurePolicy` docstring for details). This PR also introduces a `BackpressurePolicy` interface, making the backpressure policies configurable and pluggable. Later we should migrate existing backpressure mechanisms to this new interface. Known limitations: - If the config is not properly set, perf may regress for some workloads. Thus we disable this feature by default in 2.8, and will enable it in 2.9. - This feature only caps the initial concurrency. Once the cap has ramped up, data can still pile up. This issue will be resolved by a different backpressure policy that will profile the runtime metrics (e.g, object store increase, RAM usage) of each op. - It doesn't backpressure tasks that output many blocks. This issue will be solved by streaming generator backpressure instead. --------- Signed-off-by: Hao Chen <chenh1024@gmail.com>
Today when a new dataset is launched, the StreamingExecutor will allocate all resources to the first operator. Consider a simple case `read -> map` where the read and map are not fused, the scheduler will submit as many read tasks as possible up front. Assuming the read tasks will output many blocks, the blocks will pile up because we don’t have resources to submit map tasks to consume the data. This PR tries to mitigate this issue by cap the concurrency of each op with an initial value and ramp up the cap exponentially as the execution goes on (see `ConcurrencyCapBackpressurePolicy` docstring for details). This PR also introduces a `BackpressurePolicy` interface, making the backpressure policies configurable and pluggable. Later we should migrate existing backpressure mechanisms to this new interface. Known limitations: - If the config is not properly set, perf may regress for some workloads. Thus we disable this feature by default in 2.8, and will enable it in 2.9. - This feature only caps the initial concurrency. Once the cap has ramped up, data can still pile up. This issue will be resolved by a different backpressure policy that will profile the runtime metrics (e.g, object store increase, RAM usage) of each op. - It doesn't backpressure tasks that output many blocks. This issue will be solved by streaming generator backpressure instead. --------- Signed-off-by: Hao Chen <chenh1024@gmail.com>
Why are these changes needed?
Today when a new dataset is launched, the StreamingExecutor will allocate all resources to the first operator.
Consider a simple case
read -> map
where the read and map are not fused, the scheduler will submit as many read tasks as possible up front. Assuming the read tasks will output many blocks, the blocks will pile up because we don’t have resources to submit map tasks to consume the data.This PR tries to mitigate this issue by cap the concurrency of each op with an initial value and ramp up the cap exponentially as the execution goes on (see
ConcurrencyCapBackpressurePolicy
docstring for details).This PR also introduces a
BackpressurePolicy
interface, making the backpressure policies configurable and pluggable. Later we should migrate existing backpressure mechanisms to this new interface.Known limitations:
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.